package com.strongbj.iot.devices.amazondb.request.handle;

import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import javax.annotation.PreDestroy;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import com.alibaba.fastjson.JSONObject;
import com.strongbj.core.message.IMessageHandle;
import com.strongbj.iot.devices.amazonreader.request.message.MQBodyMessage;

import io.netty.channel.ChannelHandlerContext;

public class InventoryHandle implements IMessageHandle<MQBodyMessage<Object>, Object> {
	private static Logger logger = LogManager.getLogger(InventoryHandle.class.getName());
	private final static String ACTION_CODE = "inventoryTag";
	private final static Map<String,Set<String>> tagsMap = new ConcurrentHashMap<>();
	protected ExecutorService sendThreadPool = Executors.newSingleThreadExecutor();

	@Override
	public boolean isHandle(MQBodyMessage<Object> t) {
		if (t.getActioncode().equals(ACTION_CODE)) {
			return true;
		} else {
			return false;
		}
	}

	@Override
	public Object handle(ChannelHandlerContext ctx, MQBodyMessage<Object> t) {
		Set<String> onLineTagSet = new ConcurrentSkipListSet<>();
		Set<String> offlineTabSet = new ConcurrentSkipListSet<>();
		tagsMap.put(t.getGUID()+"_online", onLineTagSet);
		tagsMap.put(t.getGUID()+"_offline", offlineTabSet);
		JSONObject data = (JSONObject) t.getPostdata();
		String[] readerCodes = data.getObject("readerCode", String[].class);
		for(Object readerCode:readerCodes) {
			sendThreadPool.execute(new InventoryRunnable(String.valueOf(readerCode), t,tagsMap));
		}
		sendThreadPool.execute(new PushInventoryToMQRunnable(t.getGUID(),readerCodes,tagsMap));
		return null;
	}

	@PreDestroy
	public void destory() {
		try {
			sendThreadPool.shutdown(); // 发送关闭线程请求

			// 超时后向所有线程发出中断信号
			if (!sendThreadPool.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
				sendThreadPool.shutdownNow();
			}
		} catch (InterruptedException e) {
			logger.error("", e);
			sendThreadPool.shutdownNow();
		}
	}
}
